package com.zxt.mq.config.rabbitmq;

import com.alibaba.fastjson.JSONObject;
import com.mchange.lang.ThrowableUtils;
import com.zstk.frame.context.ZAppContextUtil;
import com.zxt.mq.config.callback.BaseConfirmCallback;
import com.zxt.mq.config.callback.BaseReturnCallback;
import com.zxt.mq.mvc.log.send.MessageSendLogService;
import com.zxt.mq.mvc.log.send.zconstant.SendLogConstant;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.nio.charset.StandardCharsets;

import javax.annotation.Resource;

import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;

/**
 * <p>
 * rabbitmq 初始化配置类
 * </p>
 *
 * @author zxt
 * @since 2022/6/15 11:32
 */
@Configuration
@Slf4j
public class RabbitConfig {
    @Resource
    CachingConnectionFactory cachingConnectionFactory;

    @Resource
    MessageSendLogService messageSendLogService;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        //若使用confirm-callback或return-callback，必须要配置publisherConfirms或publisherReturns为true
        //每个rabbitTemplate只能有一个confirm-callback和return-callback，如果这里配置了，那么写生产者的时候不能再写confirm-callback和return-callback
        //使用return-callback时必须设置mandatory为true，或者在配置中设置mandatory-expression的值为true

        // 考虑到并发性，与 validErr 消息的 次要性，这里不使用 confirm 模式 和 return 模式
        //如果使用这两个模式的话，会报异常 channelMax reached。
        // 如果后边需要这两个模式的话，再做解决，可以考虑通过Thread.sleep() 的方式，减少 channel 的积压

        // publisher-confirm-type 为三种  none 默认值 不启用发布确认模式；correlated 消息到交换器会触发确认回调方法；simple
        // 消息到交换器时会回调，其二，此类型有会关闭channel的可能，会导致后续无法发送消息
        cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        cachingConnectionFactory.setPublisherReturns(true);

        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        //强制消息投递，消息未抵达队列时会发送消息回调ReturnCallback
        rabbitTemplate.setMandatory(true);
        //如果消息没有到exchange,则confirm回调,ack=false
        //如果消息到达exchange,则confirm回调,ack=true
        //exchange到queue成功,则不回调return
        //exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            try {
                BaseConfirmCallback baseConfirmCallback = ZAppContextUtil.getBean(BaseConfirmCallback.class);
                if (ack) {
                    baseConfirmCallback.success(correlationData, cause);
                    log.debug("消息到交换机成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                } else {
                    if (ObjectUtil.isNotNull(correlationData)) {
                        messageSendLogService.updateLog(correlationData.getId(), SendLogConstant.SEND_FAIL, cause);
                    }
                    log.debug("消息到交换机失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                    baseConfirmCallback.fail(correlationData, cause);
                }
            } catch (NoSuchBeanDefinitionException e) {
                log.debug("交换机确认回调对象未定义,{}", ThrowableUtils.extractStackTrace(e));
            }
        });
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.debug("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",
                      exchange,
                      routingKey,
                      replyCode,
                      replyText,
                      message);
            try {
                //发送失败更新发送记录
                String str = new String(message.getBody(), StandardCharsets.UTF_8);
                RabbitData rabbitData = JSONObject.parseObject(str, RabbitData.class);
                messageSendLogService.updateLog(rabbitData.getUri(), SendLogConstant.SEND_FAIL, replyText);
                //发送失败后 回调业务处理
                BaseReturnCallback baseReturnCallback = ZAppContextUtil.getBean(BaseReturnCallback.class);
                baseReturnCallback.fail(message, replyCode, replyText, exchange, routingKey);
            } catch (NoSuchBeanDefinitionException e) {
                log.debug("交换机到队列失败回调对象未定义,{}", ThrowableUtils.extractStackTrace(e));
            }
        });
        return rabbitTemplate;
    }

    @Bean
    public RabbitAdmin rabbitAdmin() {
        return new RabbitAdmin(cachingConnectionFactory);
    }
}
